"""
    celery 异步 测试
"""
import json

from cytasks.tasks import task1
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt


@csrf_exempt
def task_test(request):
    try:
        res = task1.apply_async()
        # print(res.get()) 等待执行结果
        return JsonResponse({"msg": "join task success","res":res.id})
    except Exception as ex:
        print(ex)
  
